package com.ruc.dbiir.rest.controller;

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

import com.ruc.dbiir.rest.datasource.ReadKafka;
import com.ruc.dbiir.rest.datasource.ReadKafka2;
import com.ruc.dbiir.rest.datasource.ErrorConsumer;
import com.ruc.dbiir.rest.utils.Config;
import com.ruc.dbiir.rest.utils.ParseUtil;
import com.ruc.dbiir.rest.utils.RemoteExecuteCmd;

@Controller
@RequestMapping(value = "/highEchart")
public class HignChartsController
{

	// @Resource(name = "readKafka")
	// private ReadKafka readKafka;
	public static int x = 1;
	public static int y = 2;
	public static ReadKafka rk = null;
	public static ReadKafka2 rk2 = null;
	public static ErrorConsumer rk3 = null;

	@RequestMapping(value = "/getXY")
	@ResponseBody
	public Object getRealUsersByAno()
	{
		System.out.println("hhh");
		String jsonStr = "[" + x + "," + y + "]";
		x++;
		y++;
		return ParseUtil.toObject(jsonStr);
	}

	// /highEchart/startConsumer
	@RequestMapping(value = "/startConsumer")
	public void startConsumer()
	{
		new Thread(new Runnable()
		{
			public void run()
			{
				rk = new ReadKafka();
				System.out.println("consumer has been started!!!!");
				rk.consume();
			}
		}).run();

	}

	// /highEchart/startConsumer
	@RequestMapping(value = "/startConsumer2")
	public void startConsumer2()
	{

		new Thread(new Runnable()
		{
			public void run()
			{
				rk2 = new ReadKafka2();
				System.out.println("consumer222 has been started!!!!");
				rk2.consume();
			}
		}).run();

	}

	@RequestMapping(value = "/getNum")
	@ResponseBody
	public Object getNum()
	{
		// System.out.println("getNum===");
		String num = Config.NUMQUEUE.poll();
		if (num == null)
		{
			num = "0";
		}
		// 通过计算前后两次的差来求生成速率
		// int rate = Integer.parseInt(num) - Config.PRE_TUPLES;
		// Config.PRE_TUPLES = Integer.parseInt(num);
		// return ParseUtil.toObject(rate+"");
		return ParseUtil.toObject(num + "");
	}

	@RequestMapping(value = "/getErrorWater")
	@ResponseBody
	public Object getErrorWater()
	{
		// System.out.println("getErrorWater===");
		String errorMsg = Config.ERRORQUEUE.poll();
		if (errorMsg == null)
		{
			errorMsg = "nothing";
		}
		// 通过计算前后两次的差来求生成速率

		return ParseUtil.toObject(errorMsg);
	}
	
	/**
	 * storm 集群的配置修改请求
	 * author：mark   
	 * createTime：Jun 11, 2018 2:41:57 PM
	 * @param session
	 * @param code
	 * @return
	 * volume":"1000","worker":"1","spout":"1","bolt":"1","mode":"local"
	 */
	@RequestMapping(value = "/genconf")
	@ResponseBody
	public Object genConf( @RequestParam(value = "volume") String volume,  @RequestParam(value = "error_rate") String error_rate,
			@RequestParam(value = "worker") String worker,@RequestParam(value = "spout") String spout,
			@RequestParam(value = "bolt") String bolt,@RequestParam(value = "mode") String mode) {
		
		System.out.println(volume+error_rate+worker+spout+bolt+mode);
		
		RemoteExecuteCmd rec=new RemoteExecuteCmd(Config.IP, Config.USER, Config.PASSWD);  
		
        //执行命令  
		String cmd = Config.CMD_STORM_EXE_PARAMS+worker+" "+volume+" "+error_rate+" "+spout+" "+bolt;
		String returnStr = rec.execute(cmd);
        System.out.println(returnStr);  
		
		return returnStr;
	}
	
	@RequestMapping(value = "/killjob")
	@ResponseBody
	public Object killJob( @RequestParam(value = "name") String name) {
		
		System.out.println(name);
		
		RemoteExecuteCmd rec=new RemoteExecuteCmd(Config.IP, Config.USER, Config.PASSWD);  
        //执行命令  
		String cmd = Config.CMD_STORM_KILL+name;
		String returnStr = rec.execute(cmd);
        System.out.println(returnStr);  
		
		return returnStr;
	}
	
	@RequestMapping(value = "/test")
	@ResponseBody
	public Object test( @RequestParam(value = "name") String name) {
		
		System.out.println(name);
		
		RemoteExecuteCmd rec=new RemoteExecuteCmd(Config.IP, Config.USER, Config.PASSWD);  
        //执行命令  
		String cmd = "bash /data/rucer/mark/shell/storm_exe.sh";
		String returnStr = rec.execute(cmd);
        System.out.println(returnStr);  
		
		return returnStr;
	}

	
	@RequestMapping(value = "/getWater")
	@ResponseBody
	public Object getWater()
	{
		String errorMsg = Config.WATERQUEUE.poll();
		
		if (errorMsg == null)
		{
			errorMsg = "nothing";
		}

		return ParseUtil.toObject(errorMsg);
	}
	
}
